Jetty的网络IO模型

网络上有不少分析jetty网络模型的文章,但是对应的源码都相对较老,本文基于jetty 9.2.16结合代码进行简单分析。

Java Nio中的selector

Selector是Java NIO中能够同时检测多个Channel的IO事件的组件,与传统IO不同,selector只检测这些IO事件的就绪,并不真正完成这些事件。

Selector可以在指定的channel上监听感兴趣的IO事件类型,通常包括SelectionKey.OP_READ(读取数据),SelectionKey.OP_WRITE(写入数据),SelectionKey.OP_CONNECT(发起连接),SelectionKey.OP_ACCEPT(接受连接)四种事件类型。

Jetty中的网络IO模型很大程度上依赖了Selector的功能。

Acceptor

Jetty中网络请求的入口通常是Acceptor,顾名思义,Acceptor用于接受请求,在Jetty的实现中,有独立的Acceptor线程用于处理接受网络请求这一事件。

在ServerConnector类中,有一个_acceptors的线程数组

在Connector start的时候, 会根据_acceptors数组的长度创建对应数量的Acceptor,

1
2
3
4
5
6
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a);
getExecutor().execute(a);
}

Acceptor是ServerConnector中的一个内部类,同时是一个Runnable,在执行run方法时,它会自己_acceptors数组中对应下标的Thread

Acceptor线程是通过getExecutor()得到的线程池创建的,因此它占用我们设置的jetty线程池中的一部分。

Acceptor在accept时是通过阻塞的方式进行的

1
2
3
4
5
6
7
8
9
10
11
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
// 这里是阻塞的
SocketChannel channel = serverChannel.accept();
// 执行到这里时说明有请求进来了
accepted(channel);
}
}

accept成功后会调用accepted()函数

accepted()函数中会将SocketChannel设置为非阻塞模式,然后交给selector线程处理

1
2
3
4
5
6
7
8
private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket);
// _manager是SelectorManager实例,里面管理了所有的selector线程
_manager.accept(channel);
}

以上,我们可以看到Acceptor线程实际上还是以阻塞的方式接受请求,这一块仍不能算NIO,selector线程才有真正意义上的NIO处理。

当然我们知道selector也能处理accept事件,jetty实际也支持通过selector来处理,只需要将acceptor数量设置为0即可,具体下面会讲到。

Selector

jetty中的selector线程主要用于监听channel上的read和write事件

jetty的selector线程由SelectorManager类管理,内部有一个_selectors的线程数组,

selector的线程类是ManagedSelector,是SelectorManager的内部类

SelectorManager是一个抽象类,它的具体实现类在ServerConnector中,叫ServerConnectorManager

在看selector代码时发现selector中也有处理Accept事件的代码,回头看了一下,发现当Acceptor的数量被设置为0时,jetty会直接使用selector来处理accept,但是貌似只会选取一个selector线程来处理accept事件

1
2
3
4
5
6
7
8
9
10
protected void doStart() throws Exception
{
super.doStart();
// 没有Acceptor,直接使用selector
if (getAcceptors()==0)
{
_acceptChannel.configureBlocking(false);
_manager.acceptor(_acceptChannel);
}
}

selector线程的主流程在SelectorManager中的run函数中select()函数

在select函数中,selector先处理_change队列中的一些任务(通常是更新在某个channel上坚监听的事件类型),处理完会将状态置为SELECT

之后会调用selector的select()函数进入阻塞状态,直至有事件ready

在阻塞期间,如果有新的select事件需要注册,则会进入_change队列等待

READ事件处理

一个比较令人在意的问题是,当channel上的READ事件就绪以后,jetty在什么地方将其中的数据读取出来呢?

在Jetty中,有Endpoint和Connection的概念,其中Endpoint是对连接端点的封装,Connection则是对连接本身的封装。

通常selector在注册监听事件时,会将channel和一个attachment绑定在一起,而这个attachment在jetty中通常是SelectableEndPoint的实例,它有一个onSelected方法用来处理IO事件就绪之后的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* <p>Process changes and waits on {@link Selector#select()}.</p>
*
* @see #submit(Runnable)
*/
public void select()
{
boolean debug = LOG.isDebugEnabled();
try
{
// ...
int selected = _selector.select(); // 阻塞直至有IO事件就绪
// ...
Set<SelectionKey> selectedKeys = _selector.selectedKeys();
for (SelectionKey key : selectedKeys)
{
if (key.isValid())
{
processKey(key); // 根据key的类型处理
}
else
{
// ...
}
}
selectedKeys.clear();
}
catch (Throwable x)
{
if (isRunning())
LOG.warn(x);
else
LOG.ignore(x);
}
}
private void processKey(SelectionKey key)
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
((SelectableEndPoint)attachment).onSelected(); // 调用onSelected
}
else if (key.isConnectable())
{
processConnect(key, (Connect)attachment);
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException();
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}

那么onSelected这个函数中做了什么呢,这个函数的实现在SelectChannelEndPoint中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onSelected()
{
assert _selector.isSelectorThread();
int oldInterestOps = _key.interestOps();
int readyOps = _key.readyOps();
int newInterestOps = oldInterestOps & ~readyOps;
setKeyInterests(oldInterestOps, newInterestOps);
updateLocalInterests(readyOps, false);
if (_key.isReadable())
getFillInterest().fillable(); //这里是关键代码
if (_key.isWritable())
getWriteFlusher().completeWrite();
}

可以看到getFillInterest().fillable()是关键代码,它最终会调用AbstractConnection中的ReadCallback的succeeded方法,具体调用链这里就不贴代码了,相对来说比较麻烦,读者可以自行跟到函数里面看。

那么succeeded方法做了什么呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private class ReadCallback implements Callback
{
@Override
public void succeeded()
{
while(true)
{
State state=_state.get();
if (next(state,state.onFillable()))
break;
}
}
//..
};

根据代码分析,它实际上是做了一个状态机状态的切换,通过分析AbstractionConnection这个类,我们可以发现其中维护了一个状态机用来表示当前这个Connection处理的状态,我们可以看一下这个状态机的基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static class State
{
private final String _name;
State(String name)
{
_name=name;
}
@Override
public String toString()
{
return _name;
}
void onEnter(AbstractConnection connection)
{
}
State fillInterested()
{
throw new IllegalStateException(this.toString());
}
State onFillable()
{
throw new IllegalStateException(this.toString());
}
State onFilled()
{
throw new IllegalStateException(this.toString());
}
State onFailed()
{
throw new IllegalStateException(this.toString());
}
}

可以看到,这个状态机在触发某些事件的时候会执行对应的函数,而前面提到succeeded函数实际上是从FILL_INTERESTED切换到FILLING(关于状态机具体如何切换这里就不多介绍),而在这一过程中会触发FILLING这个State的onEnter函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static final class FillingState extends State
{
private FillingState()
{
super("FILLING");
}
@Override
public void onEnter(AbstractConnection connection)
{
if (connection._executeOnfillable)
connection.getExecutor().execute(connection._runOnFillable);
else
connection._runOnFillable.run();
}
@Override
State fillInterested()
{
return FILLING_FILL_INTERESTED;
}
@Override
public State onFilled()
{
return IDLE;
}
}

可以看到根据_executeOnfillable参数的不同,会决定是从线程池中拿一个线程还是在当前线程执行_runOnFillable,首先我们先关注一下_runOnFillable做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final Runnable _runOnFillable = new Runnable()
{
@Override
public void run()
{
try
{
onFillable();
}
finally
{
while(true)
{
State state=_state.get();
if (next(state,state.onFilled()))
break;
}
}
}
};

这里调用了onFillable()函数,这在AbstractConnection中是一个抽象方法,它的实现之一在HttpConnection中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable {}", this, _channel.getState());
final HttpConnection last=setCurrentConnection(this);
int filled=Integer.MAX_VALUE;
boolean suspended=false;
try
{
// while not suspended and not upgraded
while (!suspended && getEndPoint().getConnection()==this)
{
// Do we need some data to parse
if (BufferUtil.isEmpty(_requestBuffer))
{
// If the previous iteration filled 0 bytes or saw a close, then break here
if (filled<=0)
break;
// Can we fill?
if(getEndPoint().isInputShutdown())
{
// No pretend we read -1
filled=-1;
_parser.atEOF();
}
else
{
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
_requestBuffer = getRequestBuffer();
// fill
filled = getEndPoint().fill(_requestBuffer);
if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
// tell parser
if (filled < 0)
_parser.atEOF();
}
}
// Parse the buffer
if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
{
// The parser returned true, which indicates the channel is ready to handle a request.
// Call the channel and this will either handle the request/response to completion OR,
// if the request suspends, the request/response will be incomplete so the outer loop will exit.
// Not that onFillable no longer manipulates the request buffer from this point and that is
// left to threads calling #completed or #parseContent (which may be this thread inside handle())
suspended = !_channel.handle();
}
else
{
// We parsed what we could, recycle the request buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
releaseRequestBuffer();
}
}
}
catch (EofException e)
{
LOG.debug(e);
}
catch (Exception e)
{
if (_parser.isIdle())
LOG.debug(e);
else
LOG.warn(this.toString(), e);
close();
}
finally
{
setCurrentConnection(last);
if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
{
fillInterested();
}
}
}

其中调用了getEndPoint().fill(_requestBuffer),这段代码的作用是从endpoint中读取数据填充到_requestBuffer中,前面说过,endpoint实际是连接端点的封装,也就是说这个函数会从socketChannel中读取数据,我们可以看一下ChannelEndpoint的fill函数来验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public int fill(ByteBuffer buffer) throws IOException
{
if (_ishut)
return -1;
int pos=BufferUtil.flipToFill(buffer);
try
{
int filled = _channel.read(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
LOG.debug("filled {} {}", filled, this);
if (filled>0)
notIdle();
else if (filled==-1)
shutdownInput();
return filled;
}
catch(IOException e)
{
LOG.debug(e);
shutdownInput();
return -1;
}
finally
{
BufferUtil.flipToFlush(buffer,pos);
}
}

可以看到确实调用了_channel.read(buffer)

前面提到,有个叫_executeOnfillable的参数控制从channel中读取数据这一过程是从线程池中拿一个线程来进行处理还是直接在当前线程(selector线程)处理,实际这个参数默认情况下是true,也就是说默认情况下都是在另外一个线程中进行IO操作的,selector线程通常情况下只是专注于监听IO事件的就绪而不会因为IO事件而阻塞。